package net.wicp.tams.duckula.task.disruptor;

import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alibaba.fastjson.JSONObject;
import com.lmax.disruptor.EventHandler;

import lombok.extern.slf4j.Slf4j;
import net.wicp.tams.common.Plugin;
import net.wicp.tams.common.apiext.IOUtil;
import net.wicp.tams.common.apiext.LoggerUtil;
import net.wicp.tams.common.constant.JvmStatus;
import net.wicp.tams.duckula.common.beans.Pos;
import net.wicp.tams.duckula.common.constant.SerializerEnum;
import net.wicp.tams.duckula.plugin.IReceive;
import net.wicp.tams.duckula.plugin.ISerialize;
import net.wicp.tams.duckula.plugin.Utils;
import net.wicp.tams.duckula.task.Main;
import net.wicp.tams.duckula.task.bean.EventPackage;

/***
 * 此为单线程任务，要保证此线程不出异常
 * 
 * @author rjzjh
 *
 */
@Slf4j
public class DisruptorSendHandler implements EventHandler<EventPackage> {
	private static final org.slf4j.Logger errorBinlog = org.slf4j.LoggerFactory.getLogger("errorBinlog");
	private final IReceive receive;
	private final ISerialize serialize;
	private final ExecutorService exec;
	private final int sendAll = 5;// 总共重试次数
	public static File rootDir;
	static {
		rootDir = new File(System.getenv("DUCKULA_HOME"));
	}

	public DisruptorSendHandler(JSONObject params) {
		Thread.currentThread().setName("zorro-sendHandler");
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

		Plugin serializerPlugin = null;
		SerializerEnum serializerEnum = Main.context.getTask().getSerializerEnum();
		if (serializerEnum != null && serializerEnum != SerializerEnum.no) {
			serializerPlugin = Utils.newPlugin(
					IOUtil.mergeFolderAndFilePath(rootDir.getPath(),
							Main.context.getTask().getSerializerEnum().getPluginJar()),
					"net.wicp.tams.duckula.plugin.ISerialize", classLoader, "net.wicp.tams.duckula.plugin.ISerialize");
			Thread.currentThread().setContextClassLoader(serializerPlugin.getLoad().getClassLoader());// 需要加载前设置好classload
			this.serialize = Utils.loadSerialize(serializerPlugin);
		} else {
			this.serialize = null;
		}

		Plugin plugin = Utils.newPlugin(
				IOUtil.mergeFolderAndFilePath(rootDir.getPath(), Main.context.getTask().getReceivePluginDir()),
				"net.wicp.tams.duckula.plugin.IReceive",
				serializerPlugin == null ? classLoader : serializerPlugin.getLoad().getClassLoader(),
				"net.wicp.tams.duckula.plugin.IReceive", "net.wicp.tams.duckula.plugin.ReceiveAbs");
		Thread.currentThread().setContextClassLoader(plugin.getLoad().getClassLoader());// 需要加载前设置好classload
		this.receive = Utils.loadReceive(plugin, params);
		if (receive == null) {
			log.error("加载接收器失败");
			LoggerUtil.exit(JvmStatus.s15);
		}
		exec = Executors.newFixedThreadPool(1);

	}

	private Pos curPos = null;

	@Override
	public void onEvent(final EventPackage event, long sequence, boolean endOfBatch) throws Exception {
		if (event.isXid()) {
			// curPos==null 发生在没有更新数据，却有表结构修改的情况下
			if (curPos != null) {
				// 更新最新gtid
				curPos.setIshalf(false);
				Main.context.setPos(curPos);
			}
			return;
		}
		// 真正发送
		int sendNum = 0;
		int timeout = 1;

		while (true) {
			Future<Boolean> future = exec.submit(new Callable<Boolean>() {
				public Boolean call() throws Exception {
					if (serialize == null) {
						return receive.receiveMsg(event, event.getRule());
					} else {
						return receive.receiveMsg(serialize.serialize(event, event.getRule().getSplitKey()),
								event.getRule());
					}
				}
			});
			timeout = timeout * 2;
			Boolean retvalue = false;
			try {
				retvalue = future.get(timeout, TimeUnit.SECONDS);// TimeUnit.SECONDS
			} catch (TimeoutException e) {
				future.cancel(true);
			} catch (Throwable th) {
				log.error("接收器代码异常，请检查并修改接收器.发送内容：" + event, th);
				LoggerUtil.exit(JvmStatus.s15);
			}

			if (retvalue != null && retvalue.booleanValue()) {
				updatePos(event);
				break;
			} else if (sendNum < sendAll) {
				log.error("第[{}]次发送失败，等待时间：[{}].请联系相关人员。", sendNum, timeout);
				sendNum++;
			} else {
				if (Main.context.isSync()) {
					log.error("总共发送[{}]次失败，发送内容[{}] 系统将停止此服务，在另一台机上重试发送", sendAll, event);
					LoggerUtil.exit(JvmStatus.s15);
				} else {
					updatePos(event);
					errorBinlog.error(curPos.toString());
					break;
				}
			}
		}
	}

	private void updatePos(final EventPackage event) {
		curPos = event.getPos();
		Main.context.getPos().setFileName(event.getPos().getFileName());
		Main.context.getPos().setPos(event.getPos().getPos());		
		Main.context.getPos().setTime(event.getPos().getTime());
		Main.context.getPos().setMasterServerId(event.getPos().getMasterServerId());
		Main.context.getPos().setIshalf(true);
		// 增加统计
		Main.metric.counter_ringbuff_pack.dec();
		Main.metric.counter_ringbuff_event.dec(event.getRowsNum());
		Main.metric.meter_sender_pack.mark();
		Main.metric.meter_sender_event.mark(event.getRowsNum());

		switch (event.getEventTable().getOptType()) {
		case insert:
			Main.metric.meter_sender_event_add.mark(event.getRowsNum());
			break;

		case delete:
			Main.metric.meter_sender_event_del.mark(event.getRowsNum());
			break;

		case update:
			Main.metric.meter_sender_event_update.mark(event.getRowsNum());
			break;

		default:
			break;
		}

	}
}
